refactor(cubestore): Replace invalidate_tables_cache flag with generi…#10683
refactor(cubestore): Replace invalidate_tables_cache flag with generi…#10683
Conversation
|
Claude finished @ovr's task in 2m 19s —— View job Code Review: Replace
|
| /// Set the callback that runs on the RW-loop thread after the RocksDB | ||
| /// commit succeeds. Overwrites any previously set callback. The callback | ||
| /// receives the store instance so it can reach shared state. Must not | ||
| /// panic and must not block on async work. | ||
| pub fn set_post_commit_callback<F>(&mut self, f: F) | ||
| where | ||
| F: FnOnce(&S) + Send + 'static, | ||
| { | ||
| self.post_commit_callback = Some(Box::new(f)); | ||
| } |
There was a problem hiding this comment.
Good doc comment. Two observations:
-
The "Must not panic" contract is important but unenforceable at compile time. Consider wrapping the callback invocation in
std::panic::catch_unwindat the call site (inwrite_operation_impl) to protect the RW-loop thread from a misbehaving callback. -
Since
set_post_commit_callbacksilently overwrites a previously set callback, if a write operation closure contains branching logic where multiple paths could set a callback, only the last one wins. This is documented, but worth noting that it may be surprising if the API gains more diverse callers. An alternative would be to collect callbacks into aVecand run them all, or return an error/log if a callback already exists.
There was a problem hiding this comment.
@claude it looks incorrect, because RocksStoreRWLoop handles catch_unwind while execution function on RW loop.
There was a problem hiding this comment.
Claude finished @ovr's task in 25s —— View job
You're right, I apologize for the incorrect suggestion. Looking at RocksStoreRWLoop::new (rocks_store.rs:841):
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(fun)) {
Err(panic_payload) => {
let restore_error = CubeError::from_panic_payload(panic_payload);
log::error!("Panic during read write loop execution: {}", restore_error);
}
Ok(res) => { ... }
}The entire RocksStoreRWLoopFn closure — which includes write_operation_impl and therefore the post-commit callback invocation — is already wrapped in catch_unwind. Adding another catch_unwind around the callback would be redundant. The "Must not panic" doc comment should instead just note that any panic will be caught and logged by the RW loop, but won't propagate to the caller.
Thanks for the correction.
| self.store | ||
| .write_operation_impl( | ||
| &self.store.rw_loop_default_cf, | ||
| op_name, | ||
| f, | ||
| Some(self.clone()), | ||
| ) | ||
| .await |
There was a problem hiding this comment.
Minor observation: self.clone() here clones the entire RocksMetaStore on every write operation. Since all fields are Arc-wrapped this is just reference count bumps (cheap), so it's fine. But it's worth noting that adding #[derive(Clone)] to RocksMetaStore widens the API surface — if any non-Arc field is added in the future, this clone could become expensive. A comment or a targeted clone_for_callback() method might make the intent clearer.
Also, accessing self.store.rw_loop_default_cf directly (instead of going through a method) requires the field to be pub(crate) — that visibility change is visible in rocks_store.rs. This is fine but creates a tighter coupling between the two modules.
There was a problem hiding this comment.
@claude how will it affect performance? As I can see, this struct containts 4 fields and all of them are under Arc(s), it should clone 4 Arc(s)?
There was a problem hiding this comment.
Claude finished @ovr's task in 34s —— View job
Performance impact of self.clone() on RocksMetaStore
- Read struct definition
- Analyze clone cost
- Post answer
Yes, you're right. RocksMetaStore has exactly 4 fields, all Arc-wrapped:
pub struct RocksMetaStore {
store: Arc<RocksStore>,
cached_tables: Arc<Mutex<Option<Arc<Vec<TablePath>>>>>,
disk_space_cache: Arc<RwLock<Option<(HashMap<String, u64>, SystemTime)>>>,
upload_loop: Arc<WorkerLoop>,
}So self.clone() is 4 atomic reference count increments — no heap allocations, no deep copies. Each Arc::clone is a single fetch_add (or equivalent) on the refcount's AtomicUsize, typically ~1–3 ns on modern hardware.
This happens once per write_operation call, well before the actual RocksDB commit (batch_write_rows). RocksDB write + fsync will dominate by 3–6 orders of magnitude (~milliseconds vs. nanoseconds), so the clone cost is immeasurable in practice. No performance concern here.
The only theoretical concern with frequent Arc::clone across threads is cache-line contention on the refcount, but since this is on the single RW-loop thread, that doesn't apply either.
…c post-commit callback API Introduce a generic `set_post_commit_callback` on `BatchPipe<S>` that fires after a successful RocksDB commit on the RW-loop thread. This replaces the hard-coded `invalidate_tables_cache` boolean flag, which was a hack that also ran before the actual commit. - `BatchPipe` is now generic over a store type `S` (defaults to `()`), allowing callbacks to receive a typed store reference - `RocksMetaStore::write_operation` passes `&RocksMetaStore` to callbacks, giving them direct access to `cached_tables` and other metastore state - Moved `cached_tables` from `RocksStore` to `RocksMetaStore` where it belongs - Cache invalidation now only happens when the commit actually succeeds - `RocksTable` methods are generic over `S` so they work with any `BatchPipe<S>`
678a124 to
222d512
Compare
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #10683 +/- ##
==========================================
+ Coverage 57.91% 58.01% +0.10%
==========================================
Files 215 215
Lines 16646 16648 +2
Branches 3347 3347
==========================================
+ Hits 9641 9659 +18
+ Misses 6516 6499 -17
- Partials 489 490 +1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
Claude finished @ovr's task in 2m 54s —— View job Comprehensive Code Review: Post-commit callback API refactoring
Overall AssessmentApprove — This is a well-executed refactoring with a genuine correctness improvement at its core. The changes are clean, well-structured across 4 logical commits, and responsive to prior review feedback. Code Quality & CorrectnessThe core correctness fix is solid. The old code ran Generics are well-designed. Good response to review feedback. The follow-up commit ( The The Remaining Suggestion: Extract Duplicated CallbackThe 10 identical closures in production code remain: batch_pipe.set_post_commit_callback(|metastore| {
*metastore.cached_tables.lock().unwrap() = None;
});This could be a one-liner helper that reuses impl BatchPipe<'_, RocksMetaStore> {
pub fn invalidate_tables_cache_on_commit(&mut self) {
self.set_post_commit_callback(|ms| ms.reset_cached_tables());
}
}This is a minor suggestion — the current form is perfectly correct and the duplication is low-risk since all 10 sites are doing exactly the same thing. But if more callback types are added in the future, having a pattern for named helpers would be nice. SecurityNo security concerns. The callback mechanism operates entirely within the RW-loop thread. Performance
TestingGood test coverage added in commit
Minor gap: No test for the "callback overwrites previous callback" behavior, but this is documented in the doc comment and not critical. Documentation
Commit StructureClean 4-commit progression:
|
…c post-commit callback API
Introduce a generic
set_post_commit_callbackonBatchPipe<S>that fires after a successful RocksDB commit on the RW-loop thread. This replaces the hard-codedinvalidate_tables_cacheboolean flag, which was a hack that also ran before the actual commit.BatchPipeis now generic over a store typeS(defaults to()), allowing callbacks to receive a typed store referenceRocksMetaStore::write_operationpasses&RocksMetaStoreto callbacks, giving them direct access tocached_tablesand other metastore statecached_tablesfromRocksStoretoRocksMetaStorewhere it belongsRocksTablemethods are generic overSso they work with anyBatchPipe<S>